/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.processors.gcp.storage;

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableList;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;

import java.io.IOException;
import java.io.OutputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;

/**
 * List objects in a google cloud storage bucket by object name pattern.
 */
@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"google cloud", "google", "storage", "gcs", "list"})
@CapabilityDescription("Retrieves a listing of objects from an GCS bucket. For each object that is listed, creates a FlowFile that represents "
        + "the object so that it can be fetched in conjunction with FetchGCSObject. This Processor is designed to run on Primary Node only "
        + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
        + "all of the data.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
        + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
        + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
@SeeAlso({PutGCSObject.class, DeleteGCSObject.class, FetchGCSObject.class})
@WritesAttributes({
        @WritesAttribute(attribute = "filename", description = "The name of the file"),
        @WritesAttribute(attribute = BUCKET_ATTR, description = BUCKET_DESC),
        @WritesAttribute(attribute = KEY_ATTR, description = KEY_DESC),
        @WritesAttribute(attribute = SIZE_ATTR, description = SIZE_DESC),
        @WritesAttribute(attribute = CACHE_CONTROL_ATTR, description = CACHE_CONTROL_DESC),
        @WritesAttribute(attribute = COMPONENT_COUNT_ATTR, description = COMPONENT_COUNT_DESC),
        @WritesAttribute(attribute = CONTENT_DISPOSITION_ATTR, description = CONTENT_DISPOSITION_DESC),
        @WritesAttribute(attribute = CONTENT_ENCODING_ATTR, description = CONTENT_ENCODING_DESC),
        @WritesAttribute(attribute = CONTENT_LANGUAGE_ATTR, description = CONTENT_LANGUAGE_DESC),
        @WritesAttribute(attribute = "mime.type", description = "The MIME/Content-Type of the object"),
        @WritesAttribute(attribute = CRC32C_ATTR, description = CRC32C_DESC),
        @WritesAttribute(attribute = CREATE_TIME_ATTR, description = CREATE_TIME_DESC),
        @WritesAttribute(attribute = UPDATE_TIME_ATTR, description = UPDATE_TIME_DESC),
        @WritesAttribute(attribute = ENCRYPTION_ALGORITHM_ATTR, description = ENCRYPTION_ALGORITHM_DESC),
        @WritesAttribute(attribute = ENCRYPTION_SHA256_ATTR, description = ENCRYPTION_SHA256_DESC),
        @WritesAttribute(attribute = ETAG_ATTR, description = ETAG_DESC),
        @WritesAttribute(attribute = GENERATED_ID_ATTR, description = GENERATED_ID_DESC),
        @WritesAttribute(attribute = GENERATION_ATTR, description = GENERATION_DESC),
        @WritesAttribute(attribute = MD5_ATTR, description = MD5_DESC),
        @WritesAttribute(attribute = MEDIA_LINK_ATTR, description = MEDIA_LINK_DESC),
        @WritesAttribute(attribute = METAGENERATION_ATTR, description = METAGENERATION_DESC),
        @WritesAttribute(attribute = OWNER_ATTR, description = OWNER_DESC),
        @WritesAttribute(attribute = OWNER_TYPE_ATTR, description = OWNER_TYPE_DESC),
        @WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
})
public class ListGCSBucket extends AbstractGCSProcessor {
    public static final PropertyDescriptor BUCKET = new PropertyDescriptor
            .Builder().name("gcs-bucket")
            .displayName("Bucket")
            .description(BUCKET_DESC)
            .required(true)
            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
            .build();

    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
            .name("gcs-prefix")
            .displayName("Prefix")
            .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
            .required(false)
            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
            .build();

    public static final PropertyDescriptor USE_GENERATIONS = new PropertyDescriptor.Builder()
            .name("gcs-use-generations")
            .displayName("Use Generations")
            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
            .required(true)
            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
            .allowableValues("true", "false")
            .defaultValue("false")
            .description("Specifies whether to use GCS Generations, if applicable.  If false, only the latest version of each object will be returned.")
            .build();

    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
        .name("record-writer")
        .displayName("Record Writer")
        .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
            "all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
        .required(false)
        .identifiesControllerService(RecordSetWriterFactory.class)
        .build();


    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return ImmutableList.<PropertyDescriptor>builder()
            .add(BUCKET)
            .add(RECORD_WRITER)
            .addAll(super.getSupportedPropertyDescriptors())
            .add(PREFIX)
            .add(USE_GENERATIONS)
            .build();
    }

    private static final Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    // State tracking
    public static final String CURRENT_TIMESTAMP = "currentTimestamp";
    public static final String CURRENT_KEY_PREFIX = "key-";
    private volatile long currentTimestamp = 0L;
    private final Set<String> currentKeys = Collections.synchronizedSet(new HashSet<>());


    private Set<String> extractKeys(final StateMap stateMap) {
        return stateMap.toMap().entrySet().parallelStream()
                .filter(x -> x.getKey().startsWith(CURRENT_KEY_PREFIX))
                .map(Map.Entry::getValue)
                .collect(Collectors.toSet());
    }

    void restoreState(final ProcessContext context) throws IOException {
        final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
        if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX+"0") == null) {
            currentTimestamp = 0L;
            currentKeys.clear();
        } else {
            currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
            currentKeys.clear();
            currentKeys.addAll(extractKeys(stateMap));
        }
    }

    void persistState(final ProcessContext context, final long timestamp, final Set<String> keys) {
        final Map<String, String> state = new HashMap<>();
        state.put(CURRENT_TIMESTAMP, String.valueOf(timestamp));

        int i = 0;
        for (final String key : keys) {
            state.put(CURRENT_KEY_PREFIX+i, key);
            i++;
        }

        try {
            context.getStateManager().setState(state, Scope.CLUSTER);
        } catch (IOException ioe) {
            getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
        }
    }

    Set<String> getStateKeys() {
        return Collections.unmodifiableSet(currentKeys);
    }

    long getStateTimestamp() {
        return currentTimestamp;
    }

    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        try {
            restoreState(context);
        } catch (IOException e) {
            getLogger().error("Failed to restore processor state; yielding", e);
            context.yield();
            return;
        }

        final long startNanos = System.nanoTime();

        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
        final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
        final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();

        final List<Storage.BlobListOption> listOptions = new ArrayList<>();
        if (prefix != null) {
            listOptions.add(Storage.BlobListOption.prefix(prefix));
        }
        if (useGenerations) {
            listOptions.add(Storage.BlobListOption.versions(true));
        }

        final Storage storage = getCloudService();

        long maxTimestamp = 0L;
        final Set<String> maxKeys = new HashSet<>();

        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);

        final BlobWriter writer;
        if (writerFactory == null) {
            writer = new AttributeBlobWriter(session);
        } else {
            writer = new RecordBlobWriter(session, writerFactory, getLogger());
        }

        try {
            writer.beginListing();

            Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
            int listCount = 0;

            do {
                for (final Blob blob : blobPage.getValues()) {
                    long lastModified = blob.getUpdateTime();
                    if (lastModified < currentTimestamp || lastModified == currentTimestamp && currentKeys.contains(blob.getName())) {
                        continue;
                    }

                    writer.addToListing(blob);

                    // Update state
                    if (lastModified > maxTimestamp) {
                        maxTimestamp = lastModified;
                        maxKeys.clear();
                    }
                    if (lastModified == maxTimestamp) {
                        maxKeys.add(blob.getName());
                    }

                    listCount++;
                }

                if (writer.isCheckpoint()) {
                    commit(session, listCount);
                    listCount = 0;
                }

                blobPage = blobPage.getNextPage();
            } while (blobPage != null);

            writer.finishListing();
            commit(session, listCount);

            if (maxTimestamp != 0) {
                currentTimestamp = maxTimestamp;
                currentKeys.clear();
                currentKeys.addAll(maxKeys);
                persistState(context, currentTimestamp, currentKeys);
            } else {
                getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{bucket});
                context.yield();
            }
        } catch (final Exception e) {
            getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
            writer.finishListingExceptionally(e);
            session.rollback();
            context.yield();
            return;
        }

        final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
        getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
    }

    private void commit(final ProcessSession session, int listCount) {
        if (listCount > 0) {
            getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
            session.commit();
        }
    }


    private interface BlobWriter {
        void beginListing() throws IOException, SchemaNotFoundException;

        void addToListing(Blob blob) throws IOException;

        void finishListing() throws IOException;

        void finishListingExceptionally(Exception cause);

        boolean isCheckpoint();
    }


    static class RecordBlobWriter implements BlobWriter {
        private static final RecordSchema RECORD_SCHEMA;

        public static final String BUCKET = "bucket";
        public static final String NAME = "name";
        public static final String SIZE = "size";
        public static final String CACHE_CONTROL = "cacheControl";
        public static final String COMPONENT_COUNT = "componentCount";
        public static final String CONTENT_DISPOSITION = "contentDisposition";
        public static final String CONTENT_ENCODING = "contentEncoding";
        public static final String CONTENT_LANGUAGE = "contentLanguage";
        public static final String CRC32C = "crc32c";
        public static final String CREATE_TIME = "createTime";
        public static final String UPDATE_TIME = "updateTime";
        public static final String ENCRYPTION_ALGORITHM = "encryptionAlgorithm";
        public static final String ENCRYPTION_KEY_SHA256 = "encryptionKeySha256";
        public static final String ETAG = "etag";
        public static final String GENERATED_ID = "generatedId";
        public static final String GENERATION = "generation";
        public static final String MD5 = "md5";
        public static final String MEDIA_LINK = "mediaLink";
        public static final String METAGENERATION = "metageneration";
        public static final String OWNER = "owner";
        public static final String OWNER_TYPE = "ownerType";
        public static final String URI = "uri";

        static {
            final List<RecordField> fields = new ArrayList<>();
            fields.add(new RecordField(BUCKET, RecordFieldType.STRING.getDataType(), false));
            fields.add(new RecordField(NAME, RecordFieldType.STRING.getDataType(), false));
            fields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType()));
            fields.add(new RecordField(CACHE_CONTROL, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(COMPONENT_COUNT, RecordFieldType.INT.getDataType()));
            fields.add(new RecordField(CONTENT_DISPOSITION, RecordFieldType.LONG.getDataType()));
            fields.add(new RecordField(CONTENT_ENCODING, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(CONTENT_LANGUAGE, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(CRC32C, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(CREATE_TIME, RecordFieldType.TIMESTAMP.getDataType()));
            fields.add(new RecordField(UPDATE_TIME, RecordFieldType.TIMESTAMP.getDataType()));
            fields.add(new RecordField(ENCRYPTION_ALGORITHM, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(ENCRYPTION_KEY_SHA256, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(ETAG, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(GENERATED_ID, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(GENERATION, RecordFieldType.LONG.getDataType()));
            fields.add(new RecordField(MD5, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(MEDIA_LINK, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(METAGENERATION, RecordFieldType.LONG.getDataType()));
            fields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(OWNER_TYPE, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(URI, RecordFieldType.STRING.getDataType()));

            RECORD_SCHEMA = new SimpleRecordSchema(fields);
        }


        private final ProcessSession session;
        private final RecordSetWriterFactory writerFactory;
        private final ComponentLog logger;
        private RecordSetWriter recordWriter;
        private FlowFile flowFile;

        public RecordBlobWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger) {
            this.session = session;
            this.writerFactory = writerFactory;
            this.logger = logger;
        }

        @Override
        public void beginListing() throws IOException, SchemaNotFoundException {
            flowFile = session.create();

            final OutputStream out = session.write(flowFile);
            recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile);
            recordWriter.beginRecordSet();
        }

        @Override
        public void addToListing(final Blob blob) throws IOException {
            recordWriter.write(createRecordForListing(blob));
        }

        @Override
        public void finishListing() throws IOException {
            final WriteResult writeResult = recordWriter.finishRecordSet();
            recordWriter.close();

            if (writeResult.getRecordCount() == 0) {
                session.remove(flowFile);
            } else {
                final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                flowFile = session.putAllAttributes(flowFile, attributes);

                session.transfer(flowFile, REL_SUCCESS);
            }
        }

        @Override
        public void finishListingExceptionally(final Exception cause) {
            try {
                recordWriter.close();
            } catch (IOException e) {
                logger.error("Failed to write listing as Records due to {}", new Object[] {e}, e);
            }

            session.remove(flowFile);
        }

        @Override
        public boolean isCheckpoint() {
            return false;
        }

        private Record createRecordForListing(final Blob blob) {
            final Map<String, Object> values = new HashMap<>();
            values.put(BUCKET, blob.getBucket());
            values.put(NAME, blob.getName());
            values.put(SIZE, blob.getSize());
            values.put(CACHE_CONTROL, blob.getCacheControl());
            values.put(COMPONENT_COUNT, blob.getComponentCount());
            values.put(CONTENT_DISPOSITION, blob.getContentDisposition());
            values.put(CONTENT_ENCODING, blob.getContentEncoding());
            values.put(CONTENT_LANGUAGE, blob.getContentLanguage());
            values.put(CRC32C, blob.getCrc32c());
            values.put(CREATE_TIME, blob.getCreateTime() == null ? null : new Timestamp(blob.getCreateTime()));
            values.put(UPDATE_TIME, blob.getUpdateTime() == null ? null : new Timestamp(blob.getUpdateTime()));

            final BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption();
            if (encryption != null) {
                values.put(ENCRYPTION_ALGORITHM, encryption.getEncryptionAlgorithm());
                values.put(ENCRYPTION_KEY_SHA256, encryption.getKeySha256());
            }

            values.put(ETAG, blob.getEtag());
            values.put(GENERATED_ID, blob.getGeneratedId());
            values.put(GENERATION, blob.getGeneration());
            values.put(MD5, blob.getMd5());
            values.put(MEDIA_LINK, blob.getMediaLink());
            values.put(METAGENERATION, blob.getMetageneration());

            final Acl.Entity owner = blob.getOwner();
            if (owner != null) {
                if (owner instanceof Acl.User) {
                    values.put(OWNER, ((Acl.User) owner).getEmail());
                    values.put(OWNER_TYPE, "user");
                } else if (owner instanceof Acl.Group) {
                    values.put(OWNER, ((Acl.Group) owner).getEmail());
                    values.put(OWNER_TYPE, "group");
                } else if (owner instanceof Acl.Domain) {
                    values.put(OWNER, ((Acl.Domain) owner).getDomain());
                    values.put(OWNER_TYPE, "domain");
                } else if (owner instanceof Acl.Project) {
                    values.put(OWNER, ((Acl.Project) owner).getProjectId());
                    values.put(OWNER_TYPE, "project");
                }
            }

            values.put(URI, blob.getSelfLink());

            return new MapRecord(RECORD_SCHEMA, values);
        }
    }



    /**
     * Writes Blobs by creating a new FlowFile for each blob and writing information as FlowFile attributes
     */
    private static class AttributeBlobWriter implements BlobWriter {
        private final ProcessSession session;

        public AttributeBlobWriter(final ProcessSession session) {
            this.session = session;
        }

        @Override
        public void beginListing() {
        }

        @Override
        public void addToListing(final Blob blob) {
            final Map<String, String> attributes = StorageAttributes.createAttributes(blob);

            // Create the flowfile
            FlowFile flowFile = session.create();
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, REL_SUCCESS);
        }

        @Override
        public void finishListing() {
        }

        @Override
        public void finishListingExceptionally(final Exception cause) {
        }

        @Override
        public boolean isCheckpoint() {
            return true;
        }
    }
}
